import { Channel } from 'amqplib';
import { EventEmitter } from 'events';
import { ConnectionPool } from './connection-pool';
import { Message } from './message';
import { MessageSerializer, ProducerOptions, PublishOptions, ExtendedChannel, Logger, MessageError, ManagedConnection } from '@/types';
import { JSONSerializer } from './serializers';
import { createLogger } from '@/utils/logger';

/**
 * Base producer class for publishing messages
 */
export class Producer extends EventEmitter {
  protected readonly connectionPool: ConnectionPool;
  protected readonly serializer: MessageSerializer;
  protected readonly logger: Logger;
  protected channel?: ExtendedChannel;
  protected connection?: ManagedConnection;

  constructor(
    connectionPool: ConnectionPool,
    serializer: MessageSerializer = new JSONSerializer(),
    logger?: Logger
  ) {
    super();
    this.connectionPool = connectionPool;
    this.serializer = serializer;
    this.logger = logger ?? createLogger('Producer');
  }

  /**
   * Initialize the producer
   */
  async initialize(): Promise<void> {
    try {
      this.connection = await this.connectionPool.getConnection();
      this.channel = this.connection.channel;

      this.logger.info('Producer initialized');
      this.emit('initialized');
    } catch (error) {
      this.logger.error('Failed to initialize producer', error as Error);
      throw new MessageError(`Producer initialization failed: ${error}`);
    }
  }

  /**
   * Send a message
   */
  async send(message: Message, options: ProducerOptions = {}): Promise<boolean> {
    if (!this.channel) {
      throw new MessageError('Producer not initialized. Call initialize() first.');
    }

    try {
      const serializedMessage = this.serializer.serialize(message.toObject());
      const publishOptions = this.buildPublishOptions(message, options);

      const success = this.channel!.publish(
        options.exchange || '',
        options.routingKey || '',
        Buffer.from(serializedMessage),
        publishOptions
      );

      if (success) {
        this.logger.debug(`Message ${message.id} sent successfully`);
        this.emit('messageSent', message, options);
      } else {
        this.logger.warn(`Message ${message.id} could not be sent (channel buffer full)`);
        this.emit('messageBuffered', message, options);
      }

      return success;
    } catch (error) {
      this.logger.error(`Failed to send message ${message.id}`, error as Error);
      this.emit('messageError', message, error);
      throw new MessageError(`Failed to send message: ${error}`);
    }
  }

  /**
   * Send multiple messages in batch
   */
  async sendBatch(messages: Message[], options: ProducerOptions = {}): Promise<boolean[]> {
    const results: boolean[] = [];

    for (const message of messages) {
      try {
        const result = await this.send(message, options);
        results.push(result);
      } catch (error) {
        this.logger.error(`Failed to send message ${message.id} in batch`, error as Error);
        results.push(false);
      }
    }

    return results;
  }

  /**
   * Build publish options from message and producer options
   */
  protected buildPublishOptions(
    message: Message,
    options: ProducerOptions
  ): PublishOptions {
    // Filter out undefined values from headers
    const messageHeaders: Record<string, string | number | boolean> = {};
    if (message.headers) {
      Object.entries(message.headers).forEach(([key, value]) => {
        if (value !== undefined) {
          messageHeaders[key] = value;
        }
      });
    }

    const publishOptions: PublishOptions = {
      messageId: message.id,
      timestamp: message.timestamp,
      headers: messageHeaders,
      persistent: options.persistent ?? true,
    };

    // Add optional properties
    if (options.priority !== undefined) {
      publishOptions.priority = options.priority;
    }

    if (options.expiration !== undefined) {
      publishOptions.expiration = options.expiration.toString();
    }

    if (options.headers) {
      // Filter out undefined values from options headers
      const optionsHeaders: Record<string, string | number | boolean> = {};
      Object.entries(options.headers).forEach(([key, value]) => {
        if (value !== undefined) {
          optionsHeaders[key] = value;
        }
      });
      publishOptions.headers = { ...publishOptions.headers, ...optionsHeaders };
    }

    // Add message properties
    Object.assign(publishOptions, message.properties);

    return publishOptions;
  }

  /**
   * Close the producer
   */
  async close(): Promise<void> {
    if (this.channel) {
      try {
        await this.channel.close();
        this.logger.info('Producer closed');
        this.emit('closed');
      } catch (error) {
        this.logger.error('Error closing producer', error as Error);
      } finally {
        this.channel = undefined;
      }
    }

    // 返回连接到连接池
    if (this.connection) {
      try {
        await this.connectionPool.returnConnection(this.connection);
        this.logger.debug('Connection returned to pool');
      } catch (error) {
        this.logger.error('Error returning connection to pool', error as Error);
      } finally {
        this.connection = undefined;
      }
    }
  }
}

/**
 * Reliable producer with confirmation support
 */
export class ReliableProducer extends Producer {
  private confirmDelivery: boolean;

  constructor(
    connectionPool: ConnectionPool,
    confirmDelivery = true,
    serializer: MessageSerializer = new JSONSerializer(),
    logger?: Logger
  ) {
    super(connectionPool, serializer, logger);
    this.confirmDelivery = confirmDelivery;
  }

  /**
   * Initialize the reliable producer
   */
  async initialize(): Promise<void> {
    await super.initialize();

    if (this.confirmDelivery && this.channel) {
      await (this.channel as any).confirmSelect();
      this.logger.info('Producer confirmation mode enabled');
    }
  }

  /**
   * Send a message with confirmation
   */
  async send(message: Message, options: ProducerOptions = {}): Promise<boolean> {
    if (!this.channel) {
      await this.initialize();
    }

    try {
      const serializedMessage = this.serializer.serialize(message.toObject());
      const publishOptions = this.buildPublishOptions(message, options);

      if (this.confirmDelivery) {
        // Use sendToQueue or publish with confirmation
        if (options.exchange) {
          await new Promise<void>((resolve, reject) => {
            (this.channel as any).publish(
              options.exchange!,
              options.routingKey || '',
              Buffer.from(serializedMessage),
              publishOptions,
              (error: any) => {
                if (error) {
                  reject(error);
                } else {
                  resolve();
                }
              }
            );
          });
        } else {
          await new Promise<void>((resolve, reject) => {
            if (!this.channel) {
              reject(new Error('Channel is not available'));
              return;
            }

            this.channel.sendToQueue?.(
              options.routingKey || '',
              Buffer.from(serializedMessage),
              publishOptions,
              (error: Error | null) => {
                if (error) {
                  reject(error);
                } else {
                  resolve();
                }
              }
            );
          });
        }

        this.logger.debug(`Message ${message.id} confirmed`);
        this.emit('messageConfirmed', message, options);
        return true;
      } else {
        return await super.send(message, options);
      }
    } catch (error) {
      this.logger.error(`Failed to send message ${message.id} with confirmation`, error as Error);
      this.emit('messageError', message, error);
      throw new MessageError(`Failed to send message with confirmation: ${error}`);
    }
  }

  protected buildPublishOptions(
    message: Message,
    options: ProducerOptions
  ): Record<string, unknown> {
    // Filter out undefined values from headers
    const messageHeaders: Record<string, string | number | boolean> = {};
    if (message.headers) {
      Object.entries(message.headers).forEach(([key, value]) => {
        if (value !== undefined) {
          messageHeaders[key] = value;
        }
      });
    }

    const publishOptions: Record<string, unknown> = {
      messageId: message.id,
      timestamp: message.timestamp,
      headers: messageHeaders,
      persistent: options.persistent ?? true,
    };

    // Add optional properties
    if (options.priority !== undefined) {
      publishOptions.priority = options.priority;
    }

    if (options.expiration !== undefined) {
      publishOptions.expiration = options.expiration.toString();
    }

    if (options.headers) {
      // Filter out undefined values from options headers
      const optionsHeaders: Record<string, string | number | boolean> = {};
      Object.entries(options.headers).forEach(([key, value]) => {
        if (value !== undefined) {
          optionsHeaders[key] = value;
        }
      });

      const currentHeaders = publishOptions.headers as Record<string, string | number | boolean>;
      publishOptions.headers = { ...currentHeaders, ...optionsHeaders };
    }

    // Add message properties
    Object.assign(publishOptions, message.properties);

    return publishOptions;
  }
}

/**
 * Transaction producer for atomic message publishing
 */
export class TransactionProducer extends Producer {
  private inTransaction = false;

  /**
   * Begin a transaction
   */
  async beginTransaction(): Promise<void> {
    if (!this.channel) {
      await this.initialize();
    }

    if (this.inTransaction) {
      throw new MessageError('Transaction already in progress');
    }

    await this.channel?.txSelect?.();
    this.inTransaction = true;
    this.logger.debug('Transaction started');
    this.emit('transactionStarted');
  }

  /**
   * Commit the transaction
   */
  async commitTransaction(): Promise<void> {
    if (!this.inTransaction) {
      throw new MessageError('No transaction in progress');
    }

    await this.channel?.txCommit?.();
    this.inTransaction = false;
    this.logger.debug('Transaction committed');
    this.emit('transactionCommitted');
  }

  /**
   * Rollback the transaction
   */
  async rollbackTransaction(): Promise<void> {
    if (!this.inTransaction) {
      throw new MessageError('No transaction in progress');
    }

    await this.channel?.txRollback?.();
    this.inTransaction = false;
    this.logger.debug('Transaction rolled back');
    this.emit('transactionRolledBack');
  }

  /**
   * Execute a function within a transaction
   */
  async withTransaction<T>(fn: () => Promise<T>): Promise<T> {
    await this.beginTransaction();

    try {
      const result = await fn();
      await this.commitTransaction();
      return result;
    } catch (error) {
      await this.rollbackTransaction();
      throw error;
    }
  }
}

/**
 * Utility functions for creating producers
 */
export const createProducer = (
  connectionPool: ConnectionPool,
  serializer?: MessageSerializer,
  logger?: Logger
): Producer => {
  return new Producer(connectionPool, serializer, logger);
};

export const createReliableProducer = (
  connectionPool: ConnectionPool,
  confirmDelivery = true,
  serializer?: MessageSerializer,
  logger?: Logger
): ReliableProducer => {
  return new ReliableProducer(connectionPool, confirmDelivery, serializer, logger);
};

export const createTransactionProducer = (
  connectionPool: ConnectionPool,
  serializer?: MessageSerializer,
  logger?: Logger
): TransactionProducer => {
  return new TransactionProducer(connectionPool, serializer, logger);
};
